#!/usr/bin/perl

use strict;
use Getopt::Long;
use Safe;
use Data::Dumper;
use Socket;
use Carp;
use POSIX ":sys_wait_h";

use Pod::Usage;

our $VERSION = '0.9.' . [qw$Revision: 24 $]->[1];

# in-memory stats
my %history;				# history for average/stdev
my $window = 200;			# window-size for average/stdev
my $curdex = 0;				# history double-buffer index

my $readlast=0;				# last time config was read
my $alertm = 0;				# first alert-count modify time

my %opt;				# command-line options
$opt{conf} = "/etc/ganglia-alert.conf";

my %default;				# default config values

$default{pid_file} = "/var/run/ganglia-alert.pid";
$default{digest_secs} = 30;
$default{sleep_secs} = 5;
$default{group_by} = 'host';

my $prog = "ganglia-alert $VERSION\n";

GetOptions(\%opt, "daemon", "email_to|email=s", "conf|config=s", "gmetad_server|server=s", "log_file|log=s", "override=s@", "kill", "help", "report") 
	|| pod2usage(-message=>$prog, -exit=>2);

if ($opt{help}) {
	pod2usage(-message=>$prog, -exit=>0, -verbose=>99, -sections=>"SYNOPSIS|DESCRIPTION|CONFIG")
}

my (%conf, %alcnt, %alconf, %exec_pids);

my $uid = getpwuid($<);

if (!readconf()) {
	#TODO: Maybe create a simple default config for the user, and use that, rather than being rude
	die("Can't read $opt{conf} : $!\n");
}

my $pid = 0;

if (open(IN, $conf{pid_file})) {
	$pid = <IN>; 
	close IN;
}

if ($opt{kill} || $opt{report}) {
	# stop daemon
	die "No pid in '$conf{pid_file}' to signal.\n" if (!$pid);
	my $sig = $opt{kill} ? 2 : 1;
	exit(kill($sig, 0+$pid)>0);
}

if ($opt{daemon}) {
	# start daemon
	if ($pid) {						# already running?
		if (kill(0, $pid)) {
			die "Already running ($pid), not starting twice\n";
		}
	}

	if (!fork) {						# daemonize
		my $pidfile = $conf{pid_file};

		open (P, ">$pidfile") || die "Can't open pidfile '$pidfile' : $!"; 
		print P $$; 
		close P;					# save pid

		open STDIN,  '</dev/null';
		open STDOUT, ">/dev/null";
		if ($conf{log_file} && !($conf{log_file} eq '-')) {
			open STDERR, '>&STDOUT';
		}
		chdir '/';
		POSIX::setsid();

		# OK, startup happened, from now on, all messages are reported via xlog... and are nonfatal
		xlog("info", "starting daemon");
		my $int;
		# todo... change to a simple UDP protocol?  signals can be wonky
		$SIG{INT}=sub {$int=1;};			# interrupt handler
		$SIG{TERM} = $SIG{INT};
		$SIG{HUP}=sub {$alertm=1;};			# sig-HUP forces mail with current state todo: maybe you just want it in a file
		while(!$int) {
			eval {dometa()};
			eval {msgq()};
			eval {waitpids()};
			my $secs = $conf{sleep_secs} > 0 ? $conf{sleep_secs} : 5;
			sleep($secs);		# snooze
		}
		unlink $conf{pid_file};
		xlog("info", "stopping daemon");
	}
} else {
	# run once, send emails, exit
	dometa();
	$conf{digest_secs} = -1;
	msgq();						
}

sub waitpids {
	my $kid;
	while ( ($kid = waitpid(-1, WNOHANG)) > 0 ) {
		if ($?) {
			xlog("error", "Error executing '$exec_pids{$kid}' : $?");
		}
		delete $exec_pids{$kid};
	};
}

sub dometa {
	readconf();
	my ($host,$port) = $conf{"gmetad_server"} =~ /^\s*([^:]+):?(\d+)?/;
	$host = "localhost" if !$host;
	$port = 8651 if !$port;
	my $iaddr = inet_aton($host) || fail("Can't connect to: $host\n");

	my $paddr  = sockaddr_in($port, $iaddr);

	socket(SOCK, PF_INET, SOCK_STREAM, getprotobyname('tcp'));
	connect(SOCK, $paddr) || fail("Can't connect: $!");

	my $dat = '';
	while (defined(my $line = <SOCK>)) {
		$dat.=$line;
	}
	close SOCK;
	$dat = xtag($dat);			# rudimentary parse
	$dat = $dat->{ganglia_xml}->[0];	# top tag, one entry

	for my $grid (keys(%{$dat->{grid}})) {
		for my $cluster (keys(%{$dat->{grid}->{$grid}->{cluster}})) {
			for my $host (keys(%{$dat->{grid}->{$grid}->{cluster}->{$cluster}->{host}})) {
				my $hdat = $dat->{grid}->{$grid}->{cluster}->{$cluster}->{host}->{$host};
				my $key = "$grid:$cluster:$host";
				metric_sum($key, $hdat);
				my $safe = safe_from_hdat($key, $hdat);
				for my $alert (values %alconf) {
					eval_alert($key, $hdat, $alert, $safe);
				}
			}
		}
	}
}

sub metric_sum {
	my ($key, $hdat) = @_;
	for my $metric (keys(%{$hdat->{metric}})) {
			next if $hdat->{metric}->{$metric}->{type} eq 'string';
			$history{$key}->[$curdex]->{$metric}->{ssq} += $hdat->{metric}->{$metric}->{val} * $hdat->{metric}->{$metric}->{val};
			$history{$key}->[$curdex]->{$metric}->{sum} += $hdat->{metric}->{$metric}->{val};
			$history{$key}->[$curdex]->{$metric}->{cnt} += 1;
	}
	if (++$history{$key}->[$curdex]->{_cnt} >= $window) {
		$history{$key}->[1-$curdex] = $history{$key}->[$curdex];
		$history{$key}->[$curdex] = {};
		$curdex = 1-$curdex;
	}
}

sub safe_from_hdat {
	my ($key, $hdat) = @_;
        my $safe = new Safe;
        for my $metric (keys(%{$hdat->{metric}})) {
                ${$safe->varglob($metric)} = $hdat->{metric}->{$metric}->{val};
                ${$safe->varglob($metric . "_tn")} = $hdat->{metric}->{$metric}->{tn};
                if ($history{$key}->[$curdex]->{$metric} && $history{$key}->[$curdex]->{$metric}->{cnt} > 0) {
                        my ($sum, $cnt, $ssq) = ($history{$key}->[$curdex]->{$metric}->{sum}, $history{$key}->[$curdex]->{$metric}->{cnt}, $history{$key}->[$curdex]->{$metric}->{ssq});
                        ${$safe->varglob($metric . "_avg")} = $sum/$cnt;
                        if ($cnt > 5) {
                                ${$safe->varglob($metric . "_stdev")} = sqrt((($cnt)*$ssq-$sum*$sum) / ($cnt*($cnt-1)));
                        } else {
                                ${$safe->varglob($metric . "_stdev")} = $sum/$cnt;
                        }
                } else {
                        ${$safe->varglob($metric . "_avg")} = 0;
                        ${$safe->varglob($metric . "_stdev")} = 0;
                }
        }
	${$safe->varglob("reported")}=$hdat->{reported};
	${$safe->varglob("tn")}=$hdat->{tn};
	return $safe;
}

sub eval_alert {
	my ($key, $hdat, $alert, $safe) = @_;
#${$safe->varglob('x')} = 5;
#die $safe->reval("\$x+2");

	my ($name, $expr, $lev, $cmd, $cmd_lev) = ($alert->{name}, $alert->{expr}, $alert->{level}, $alert->{cmd}, $alert->{cmd_level});

	for (keys(%{$alert->{host}})) {
		if ($hdat->{name} =~ /^\Q$_/) {
			$expr = $alert->{host}->{$_}->{expr};
			$lev = $alert->{host}->{$_}->{level};
			$cmd = $alert->{host}->{$_}->{cmd};
			$cmd_lev = $alert->{host}->{$_}->{cmd_level};
		}
	}

	return if !$expr;


	# evaluate expression in context of metric data
	my $ret = $safe->reval($expr);

	if ($@) {
		fail("Can't evaluate '$expr': $@");
	}

	if ($ret) {										# alert triggered
		$alcnt{$name}->{$key}->{cnt}++;
		if ($cmd_lev && $alcnt{$name}->{$key}->{cnt} == $cmd_lev) {
			$cmd = $safe->reval("\"$cmd\"");
			my $pid;
			if (!($pid=fork)) {
				# child or undef
				if (defined($pid)) {
					# child
					exec($cmd) || xlog("error", "Error running '$cmd' : $!");
					exit;                                                   # in case it fails
				} else {
					xlog("error", "Can't fork for exec on $name: $!");
				}
			} else {
				# keep cmd, so after a failed wait, you can log what failed
				$exec_pids{$pid} = $cmd;
			}
		}
		# passed thresh for first time?  send an email
		if ($alcnt{$name}->{$key}->{cnt} == $lev) {
			$alertm = time();							# email first needed flag
			xlog('alert', $key, $name);
		}
		$alcnt{$name}->{$key}->{last} = time();						# last time alert was found
		$alcnt{$name}->{$key}->{first} = time() if $alcnt{$name}->{$key}->{cnt} == 1;	# first time alert was found
	} else {
		# clear all alerts on an OK signal... this can lead to a "flicker-brownout" if the level is > 1
		if ($alcnt{$name} && $alcnt{$name}->{$key}) {
			delete $alcnt{$name}->{$key};
			delete $alcnt{$name} if 0 == scalar keys%{$alcnt{$name}};
		}
		%alcnt = () if scalar keys %alcnt == 0;
	}
	return $ret;
}

sub xtag {
	my ($dat) = @_;
	my %r;
	while ($dat =~ s/<(\S+)([^<>]+)(\/?)>//s) {
		my ($tag, $vals, $close) = (lc($1), $2, $3);
		next if $tag =~ /^[?!]/;
		my @v = $vals =~ m/(\w+)\s*=\s*((?:"[^"]+")|(?:'[^']+')\S+)/g;
		grep s/^['"]//, @v;
		grep s/['"]$//, @v;
		for (my $i=0;$i<=$#v;$i+=2) {$v[$i]=lc($v[$i]);}
		my %vals = @v;
		if (!$close && $tag !~ /^[?!]/) {
			if ($dat =~ s/^(.*?)<\/$tag>//si) {
				my $tmp = xtag($1);
				for (keys(%$tmp)) {
					$vals{$_} = $tmp->{$_};
				}
			}
		}
		if ($vals{name}) {
			$r{lc($tag)}->{$vals{name}} = \%vals;
		} else {
			push @{$r{lc($tag)}}, \%vals;
		}
	}
	return \%r;
}

my @prev_alerts;

# read config
sub _readconf {
	my ($conf, $host) = @_;
	if (!open(IN, $conf)) {
		xlog("error", "Failed to open config $conf\n");
		return undef;
	}
        while (<IN>) {
                next if /^\s*#/;
                if (/^\s*([^:]+?)\s*(?:\/(\d+))?\s*:\s*(.*?)\s*$/) {
			my ($n, $mod, $v) = (lc($1), $2, $3);

			if ($conf{define}) {
				for (keys %{$conf{define}}) {
					if ($v =~ s/\${\Q$_\E}/$conf{define}->{$_}/g) {
						# print("Replaced \${$_} : '$v');
					}
				}
			}

			if ($n eq 'include') {
                        	_readconf($v, $host);
			} elsif ($n eq 'define') {
                      		my ($n, $v) = $v =~ m/\s*([^=]+?)\s*=\s*(.*)/;
				$conf{define}->{$n} = $v;
			} elsif ($n eq 'host') {
				$host = $v;
			} elsif ($n eq 'exec') {
				if (!@prev_alerts) {
					xlog("error", "Can't define an exec ($v) without a previous alert");
				} else {
					$mod = $mod ? $mod : $prev_alerts[0]->{level};
					for (@prev_alerts) {
						$_->{cmd_level} = $mod;
						$_->{cmd} = $v;
					}
				}
			} elsif ($n =~ m/^\!/ || $n eq 'alert') {
				@prev_alerts = ();
				my $k = $v;
				$k = $1 if ($n =~ m/^!(\w+)/);
				$mod = $mod ? $mod : 1;
				if ($host) {
					for my $h (split /\s+/, $host) {
						$alconf{$k}->{name} = $k;
						$alconf{$k}->{host}->{$h}->{expr} = $v;
						$alconf{$k}->{host}->{$h}->{level} = $mod;
						push @prev_alerts, $alconf{$k}->{host}->{$h};
					}
				} else {
					$alconf{$k}->{name} = $k;
					$alconf{$k}->{expr} = $v;
					$alconf{$k}->{level} = $mod;
					push @prev_alerts, $alconf{$k};
				}
			} else {
				$conf{$n} = $v;
			}
                }
        }
        close IN;
	return 1;
}

my $save_log;

sub readconf {
	my $modtime = (stat($opt{conf}))[9];			# todo, maybe sig(HUP) here...instead of polling stat all day?  but I like it this way
	return if $readlast == $modtime;
	return if !open IN, $opt{conf}; close IN;

	# save logfile	
	$save_log = $conf{log_file};

	%conf = %default;

	# set log file from options or from the previous setting before the xlog command below
	$conf{log_file} = $opt{log_file} ? $opt{log_file} : $save_log ? $save_log : '';

	xlog("info", "Reading config: $opt{conf}") if ($opt{daemon});

	_readconf($opt{conf});

	# override config from command-line
	for (qw(gmetad_server log_file email_to)) {
		$conf{$_} = $opt{$_} if defined $opt{$_};
	}

	for (@{$opt{override}}) {
                if (/^\s*([a-z][^:]+?)\s*:\s*(.*?)\s*$/i) {
			# only support regular vars in command line
			my ($n, $v) = (lc($1), $2);
			$conf{$n} = $v;
		} else {
			# die if command line is wrong
			fail("Invalid override option: $_");
		}
	}

	if ($conf{email_to}) {
		if (!$conf{sendmail_path}) {
			eval {
				require MIME::Lite;
			};
			if ($@) {
				$conf{sendmail_path} = `which sendmail 2> /dev/null`;
				chomp $conf{sendmail_path};

				if (!$conf{sendmail_path}) {
					xlog("error", "Can't send email without MIME::Lite or sendmail installed, disabling email reports.");
					$conf{email_to} = '';
				}
			}
		}
	}

	
	# remove alerts that are left-over from previous config
	for my $k (keys %alcnt) {
		if (!$alconf{$k}) {
			delete $alcnt{$k};
		}
	}

	$readlast = $modtime; 
	return 1;
}

sub fail {
	xlog("error", @_);
	croak @_;
}

sub msgq {
	if ($alertm > 0 && $alertm < (time() - $conf{digest_secs})) {
		my (%grid, %clus, %host);

		my ($rpt, $cnt);

		for my $name (sort keys %alcnt) {
		$rpt .= "alert $name:\n" if $conf{group_by} eq 'alert';
		for my $key (sort keys %{$alcnt{$name}}) {
			my $d = $alcnt{$name}->{$key};
			if ($d->{cnt} > 0) {
				my ($grid, $cluster, $host) = split(/:/,$key);
				++$grid{$grid};
				++$clus{"$grid:$cluster"};
				++$host{$key};
				$key =~ s/unspecified://;
				if ($conf{group_by} eq 'alert') {
					$rpt .= "\t";
					$rpt .= $key . "\t";
					$rpt .= $d->{cnt} . "\t";
					$rpt .= scalar(localtime($d->{first}));
					if ($d->{first} != $d->{last}) {
						$rpt .= "\t" . scalar(localtime($d->{last}));
					}
					$rpt .= "\n";
					++$cnt;
				}
			}
		}
		}

		if ($conf{group_by} eq 'host') {
			for my $key (keys %host) {
				my $host = $key;   $host =~ s/^unspecified:?//;
				$rpt .= "host $host:\n";
				for my $name (keys %alcnt) {
					my $d = $alcnt{$name}->{$key};
					if ($d && $d->{cnt} > 0) {
						$rpt .= "\t";
						$rpt .= $name . "\t";
						$rpt .= $d->{cnt} . "\t";
						$rpt .= scalar(localtime($d->{first}));
						if ($d->{first} != $d->{last}) {
							$rpt .= "\t" . scalar(localtime($d->{last}));
						}
						$rpt .= "\n";
						++$cnt;
					}
				}
			}
		}

		# this is all to get the most specific id	
		my $id = 'Multiple Grids';
		if (keys(%grid)==1) {
			$id = (keys(%grid))[0];
			if (keys(%clus)==1) {
				$id = (keys(%clus))[0];
				if (keys(%host)==1) {
					$id = (keys(%host))[0];
				} else {
					$id .= ":Multiple Hosts";
				}
			} else {
				$id .= ":Multiple Clusters";
			}
			$id =~ s/^unspecified:?//;
		}

		$id = "All" if (keys(%grid)==0);

		my $subject = $conf{subject};
		if (!$subject) {
			$subject = "$cnt alerts for $id"; 
		} else {
			$subject =~ s/\$count\b/$cnt/g;
			$subject =~ s/\$id\b/$id/g;
		}
		my $addr = $conf{email_to};
		xlog("info", "sending email", "$subject");
		if ($addr eq '-') {
			printf STDERR "Would have sent email with subject: $subject\n";
			$alertm = 0;
		} else {
			my $ok = 0;
			if ($conf{email_to}) {
				my $from = $conf{email_from} ? $conf{email_from} : ("$uid\@" . ($ENV{HOST} ? $ENV{HOST} : 'localhost'));
				if ($conf{sendmail_path}) {
					$ok = open(SM, "|$conf{sendmail_path} -f $from -t $addr");
					$ok = $ok && print(SM "From: $from\nTo: $addr\nSubject: $subject\n\n$rpt");
					$ok = $ok && close(SM);
				} else {
					my $msg = MIME::Lite->new(
						From     => $from,
						To       => $addr,
						Subject  => $subject,
						Type     => 'text/plain',
						Data     => $rpt
					);
					$ok = $msg->send;
				}
			} else {
				$ok = 1;
			}
			if ($ok) {
				$alertm = 0;	
			} else {
				xlog("error", "Failed to send mail using " . ($conf{sendmail_path} ? "sendmail" : "MIME::Lite") . "\n");
			}
		}
	}
}

sub xlog {
	my $m = join("\t", @_);
	$m =~ s/\n/ /g;
	my $line = scalar(localtime) . "\t" . $m . "\n";
	my $log = $conf{"log_file"};
	if ($log && ! ($log eq '-')) {
		open LOG, ">>" . $log;
		print LOG $line;
		close LOG;
	} else {
		print $line;
	}
	return $line;
}

sub uniq {
	my %a;
	for (@_) {
		$a{$_}=1;
	}
	return keys %a;
}

__END__

=head1 NAME

B<ganglia-alert> - Use gmetad data to send email alerts

=head1 SYNOPSIS

B<ganglia-alert> [-d|-k] <options>

 -d|--daemon		start the alert daemon
 -k|--kill		kill the currently running daemon (-INT)
 -r|--report		send a report now (-HUP)
 -c|--conf FILE		config file location (/etc/ganglia-alert.conf)
 -h|--help		documentation

These options override config settings:

 -s|--server HOST:PORT	gmetad_server info (localhost:8651)
 -e|--email ADDR	email_to address (none)
 -l|--log FILE		log_file (standard output)

=head1 DESCRIPTION

Connects to a gmetad server, runs a bunch of tests on the data contained
therein, and emails someone if one or more of those tests return
a nonzero result.

Simple monitoring on a cluster.

=head1 CONFIG

The config file (by default /etc/ganglia-alert.conf) controls the 
operation of the alert daemon.  It contains a list of config variables, 
and a list of alerts.

Variables are formatted as "name: value" in the config file.

Config variables (default, if any, is in parenthesis):

 email_to	comma-delimited list of email addresses to send alerts to report
 email_from	from-address for email (userid@HOST)
 sendmail_path	path to sendmail, if MIME::Lite is not installed
 subject	subject template for email ($id: $cnt alerts) 
 group_by	either 'host' or 'alert', for reporting (host) 
 log_file	logfile used to write first-alerts, and daemon info to (standard output) 
 pid_file	pidfile to write pid to for daemon-mode (/var/run/ganglia-alert.pid) 
 gmetad_server	gmetad server to connect to (localhost:8651)
 digest_secs	number of seconds to spool alerts before sending an email (10)
 sleep_secs 	number of seconds to sleep before polling gmetad (5) 
 include	include another config file
 define		variable=value, replaces instances of ${variable} with value in all
		subsequent config file directives

Alerts & actions:

 alert[/N]	defines an alert activated when 'N' consecutive evaluation of the 
		associated expression are nonzero

 host		space-delimited list of hosts that future alerts ar bound to
		default is to bind alerts to all hosts

 exec[/N]	associated with the immediately previous alert definition.  will 
		execute the command (via exec()), if the alert is triggered

Example configuration:

 email_to: earonesty@example.com
 log_file: /var/log/ganglia-alert.log
 alert: $disk_free < .05 * $disk_total
 alert/3: $load_one > $cpu_num * 2
 alert: $load_one_tn > 5000
 exec/3: ssh $host /etc/init.d/ganglia-monitor restart

=head1 AVAILABILITY

Project website: L<http://ganglia-alert.googlecode.com/>, latest version: L<http://ganglia-alert.googlecode.com/svn/trunk/ganglia-alert>

=head1 COREQUISITES

C<MIME::Lite> module to send alerts.

=head1 SCRIPT CATEGORIES

UNIX : System_administration

=head1 AUTHOR

Copyright 2011 Erik Aronesty C<earonesty@cpan.org>

=head1 LICENSE

This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.

See L<http://www.perl.com/perl/misc/Artistic.html>.

=cut

=head1 

